[[features]]
= Features

Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.

[[backoff-configuration]]
== BackOff Configuration

The BackOff configuration relies on the `BackOffPolicy` interface from the `Spring Retry` project.

It includes:

* Fixed Back Off
* Exponential Back Off
* Random Exponential Back Off
* Uniform Random Back Off
* No Back Off
* Custom Back Off

[source, java]
----
@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
----

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(4)
            .create(template);
}
----

You can also provide a custom implementation of Spring Retry's `SleepingBackOffPolicy` interface:

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackoff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
----

NOTE: The default back off policy is `FixedBackOffPolicy` with a maximum of 3 attempts and 1000ms intervals.

NOTE: There is a 30-second default maximum delay for the `ExponentialBackOffPolicy`.
If your back off policy requires delays with values bigger than that, adjust the `maxDelay` property accordingly.

IMPORTANT: The first attempt counts against `maxAttempts`, so if you provide a `maxAttempts` value of 4 there'll be the original attempt plus 3 retries.

[[global-timeout]]
== Global Timeout

You can set the global timeout for the retrying process.
If that time is reached, the next time the consumer throws an exception the message goes straight to the DLT, or just ends the processing if no DLT is available.

[source, java]
----
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
----

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
----

NOTE: The default is having no timeout set, which can also be achieved by providing -1 as the timout value.

[[retry-topic-ex-classifier]]
== Exception Classifier

You can specify which exceptions you want to retry on and which not to.
You can also set it to traverse the causes to lookup nested exceptions.

[source, java]
----
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
----


[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
----

NOTE: The default behavior is retrying on all exceptions and not traversing causes.

Since 2.8.3 there's a global list of fatal exceptions which will cause the record to be sent to the DLT without any retries.
See xref:kafka/annotation-error-handling.adoc#default-eh[DefaultErrorHandler] for the default list of fatal exceptions.
You can add or remove exceptions to and from this list by overriding the `configureNonBlockingRetries` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`.
See xref:retrytopic/retry-config.adoc#retry-topic-global-settings[Configuring Global Settings and Features] for more information.

[source, java]
----

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}

----

NOTE: To disable fatal exceptions' classification, just clear the provided list.


[[include-and-exclude-topics]]
== Include and Exclude Topics

You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
----

NOTE: The default behavior is to include all topics.


[[topics-autocreation]]
== Topics AutoCreation

Unless otherwise specified the framework will auto create the required topics using `NewTopic` beans that are consumed by the `KafkaAdmin` bean.
You can specify the number of partitions and the replication factor with which the topics will be created, and you can turn this feature off.
Starting with version 3.0, the default replication factor is `-1`, meaning using the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.

IMPORTANT: Note that if you're not using Spring Boot you'll have to provide a KafkaAdmin bean in order to use this feature.

[source, java]
----
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
----
[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
----

NOTE: By default the topics are autocreated with one partition and a replication factor of -1 (meaning using the broker default).
If your broker version is earlier than 2.4, you will need to set an explicit value.

[[retry-headers]]
== Failure Header Management

When considering how to manage failure headers (original headers and exception headers), the framework delegates to the `DeadLetterPublishingRecoverer` to decide whether to append or replace the headers.

By default, it explicitly sets `appendOriginalHeaders` to `false` and leaves `stripPreviousExceptionHeaders` to the default used by the `DeadLetterPublishingRecover`.

This means that only the first "original" and last exception headers are retained with the default configuration.
This is to avoid creation of excessively large messages (due to the stack trace header, for example) when many retry steps are involved.

See xref:kafka/annotation-error-handling.adoc#dlpr-headers[Managing Dead Letter Record Headers] for more information.

To reconfigure the framework to use different settings for these properties, configure a `DeadLetterPublishingRecoverer` customizer by overriding the `configureCustomizers` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`.
See xref:retrytopic/retry-config.adoc#retry-topic-global-settings[Configuring Global Settings and Features] for more details.

[source, java]
----
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}
----

Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) +++->+++ { +++...+++ })`.

By default, any headers added will be cumulative - Kafka headers can contain multiple values.
Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain.

[[custom-dlpr]]
== Custom DeadLetterPublishingRecoverer

As can be seen in xref:retrytopic/features.adoc#retry-headers[Failure Header Management] it is possible to customize the default `DeadLetterPublishingRecoverer` instances created by the framework.
However, for some use cases, it is necessary to subclass the `DeadLetterPublishingRecoverer`, for example to override `createProducerRecord()` to modify the contents sent to the retry (or dead-letter) topics.
Starting with version 3.0.9, you can override the `RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()` method to provide a `DeadLetterPublisherCreator` instance, for example:

[source, java]
----
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> 
        configureDeadLetterPublishingContainerFactory() {
    
    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}
----

It is recommended that you use the provided resolvers when constructing the custom instance.

[[exc-based-custom-dlt-routing]]
== Routing of messages to custom DLTs based on thrown exceptions

Starting with version 3.2.0, it's possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing.
In order to do that, there's a need to specify the routing.
Routing customization consists of the specification of the additional destinations.
Destinations in turn consist of two settings: the `suffix` and `exceptions`.
When the exception type specified in `exceptions` has been thrown, the DLT containing the `suffix` will be considered as the target topic for the message before the general purpose DLT is considered.
Examples of configuration using either annotations or `RetryTopicConfiguration` beans:

[source, java]
----
@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
----

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(kafkaOperations)
            .create(template);
}
----

`suffix` takes place before the general `dltTopicSuffix` in the custom DLT name.
Considering presented examples, the message, which caused the `DeserializationException` will be routed to the `my-annotated-topic-deserialization-dlt` instead of the `my-annotated-topic-dlt`.
Custom DLTs will be created following the same rules as stated in the xref:retrytopic/features.adoc#topics-autocreation[Topics AutoCreation].
